home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
pyxmpp
/
xmlextra.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
11KB
|
398 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
__revision__ = '$Id: xmlextra.py,v 1.15 2004/10/11 18:33:51 jajcus Exp $'
__docformat__ = 'restructuredtext en'
import sys
import libxml2
import threading
import re
from pyxmpp.exceptions import StreamParseError
common_doc = libxml2.newDoc('1.0')
common_root = common_doc.newChild(None, 'root', None)
COMMON_NS = 'http://pyxmpp.jajcus.net/xmlns/common'
common_ns = common_root.newNs(COMMON_NS, None)
common_root.setNs(common_ns)
common_doc.setRootElement(common_root)
class StreamHandler:
def __init__(self):
pass
def _stream_start(self, _doc):
doc = libxml2.xmlDoc(_doc)
self.stream_start(doc)
def _stream_end(self, _doc):
doc = libxml2.xmlDoc(_doc)
self.stream_end(doc)
def _stanza(self, _doc, _node):
doc = libxml2.xmlDoc(_doc)
node = libxml2.xmlNode(_node)
self.stanza(doc, node)
def stream_start(self, doc):
print >>sys.stderr, 'Unhandled stream start:', `doc.serialize()`
def stream_end(self, doc):
print >>sys.stderr, 'Unhandled stream end', `doc.serialize()`
def stanza(self, _unused, node):
print >>sys.stderr, 'Unhandled stanza', `node.serialize()`
def error(self, descr):
raise StreamParseError, descr
try:
import _xmlextra
from _xmlextra import error
except ImportError:
print >>sys.stderr, 'WARNING: using SLOW xmlextra'
class error(Exception):
pass
def _escape(data):
data = data.replace('&', '&')
data = data.replace('<', '<')
data = data.replace('>', '>')
data = data.replace("'", ''')
data = data.replace('"', '"')
return data
class _SAXCallback(libxml2.SAXCallback):
def __init__(self, handler):
self._handler = handler
self._head = ''
self._tail = ''
self._current = ''
self._level = 0
self._doc = None
self._root = None
def cdataBlock(self, data):
if self._level > 1:
self._current += _escape(data)
def characters(self, data):
if self._level > 1:
self._current += _escape(data)
def comment(self, content):
pass
def endDocument(self):
pass
def endElement(self, tag):
self._current += '</%s>' % (tag,)
self._level -= 1
if self._level == 1:
xml = self._head + self._current + self._tail
doc = libxml2.parseDoc(xml)
try:
node = doc.getRootElement().children
try:
node1 = node.docCopyNode(self._doc, 1)
try:
self._root.addChild(node1)
self._handler.stanza(self._doc, node1)
except Exception:
None if self._level > 1 else self
None if self._level > 1 else self
import traceback as traceback
traceback.print_exc()
except:
None if self._level > 1 else self
finally:
node1.unlinkNode()
node1.freeNode()
del node1
finally:
del node
finally:
doc.freeDoc()
else:
xml = self._head + self._tail
doc = libxml2.parseDoc(xml)
try:
self._handler.stream_end(self._doc)
self._doc.freeDoc()
self._doc = None
self._root = None
finally:
doc.freeDoc()
def error(self, msg):
self._handler.error(msg)
fatalError = error
ignorableWhitespace = characters
def reference(self, name):
self._current += '&' + name + ';'
def startDocument(self):
pass
def startElement(self, tag, attrs):
s = '<' + tag
if attrs:
for a, v in attrs.items():
s += " %s='%s'" % (a, _escape(v))
s += '>'
if self._level == 0:
self._head = s
self._tail = '</%s>' % (tag,)
xml = self._head + self._tail
self._doc = libxml2.parseDoc(xml)
self._handler.stream_start(self._doc)
self._root = self._doc.getRootElement()
elif self._level == 1:
self._current = s
else:
self._current += s
self._level += 1
def warning(self):
pass
class _PythonReader:
def __init__(self, handler):
self.handler = handler
self.sax = _SAXCallback(handler)
self.parser = libxml2.createPushParser(self.sax, '', 0, 'stream')
def feed(self, data):
return self.parser.parseChunk(data, len(data), 0)
_create_reader = _PythonReader
def _get_ns(node):
try:
return node.ns()
except libxml2.treeError:
return None
def replace_ns(node, old_ns, new_ns):
if old_ns is not None:
old_ns_uri = old_ns.content
old_ns_prefix = old_ns.name
else:
old_ns_uri = None
old_ns_prefix = None
ns = _get_ns(node)
if ns is None and old_ns is None:
node.setNs(new_ns)
elif ns and ns.content == old_ns_uri and ns.name == old_ns_prefix:
node.setNs(new_ns)
p = node.properties
while p:
ns = _get_ns(p)
if ns is None and old_ns is None:
p.setNs(new_ns)
if ns and ns.content == old_ns_uri and ns.name == old_ns_prefix:
p.setNs(new_ns)
p = p.next
n = node.children
while n:
if n.type == 'element':
skip_element = False
try:
nsd = n.nsDefs()
except libxml2.treeError:
nsd = None
while nsd:
if nsd.name == old_ns_prefix:
skip_element = True
break
nsd = nsd.next
if not skip_element:
replace_ns(n, old_ns, new_ns)
n = n.next
pure_python = True
_create_reader = _xmlextra.sax_reader_new
def replace_ns(node, old_ns, new_ns):
if old_ns is None:
old_ns__o = None
else:
old_ns__o = old_ns._o
if new_ns is None:
new_ns__o = None
else:
new_ns__o = new_ns._o
if node is None:
node__o = None
else:
node__o = node._o
_xmlextra.replace_ns(node__o, old_ns__o, new_ns__o)
if old_ns__o:
_xmlextra.remove_ns(node__o, old_ns__o)
pure_python = False
def get_node_ns(xmlnode):
try:
return xmlnode.ns()
except libxml2.treeError:
return None
def get_node_ns_uri(xmlnode):
ns = get_node_ns(xmlnode)
if ns:
return unicode(ns.getContent(), 'utf-8')
else:
return None
def xml_node_iter(nodelist):
node = nodelist
while node:
yield node
node = node.next
def xml_element_iter(nodelist):
node = nodelist
while node:
if node.type == 'element':
yield node
node = node.next
def xml_element_ns_iter(nodelist, ns_uri):
node = nodelist
while node:
if node.type == 'element' and get_node_ns_uri(node) == ns_uri:
yield node
node = node.next
evil_characters_re = re.compile('[\\000-\\010\\013\\014\\016-\\037]', re.UNICODE)
utf8_replacement_char = u'\xef\xbf\xbd'.encode('utf-8')
def remove_evil_characters(s):
if isinstance(s, unicode):
return evil_characters_re.sub(u'\xef\xbf\xbd', s)
else:
return evil_characters_re.sub(utf8_replacement_char, s)
bad_nsdef_replace_re = re.compile('^([^<]*\\<[^><]*\\s+)(xmlns=((\\"[^\\"]*\\")|(\\\'[^\\\']*\\\')))')
def safe_serialize(xmlnode):
try:
ns = xmlnode.ns()
except libxml2.treeError:
ns = None
try:
nsdef = xmlnode.nsDefs()
except libxml2.treeError:
nsdef = None
s = xmlnode.serialize(encoding = 'UTF-8')
while nsdef:
if nsdef.name is None:
if not ns or (nsdef.name, nsdef.content) != (ns.name, ns.content):
s = bad_nsdef_replace_re.sub('\\1', s, 1)
break
nsdef = nsdef.next
s = remove_evil_characters(s)
return s
class StreamReader:
def __init__(self, handler):
self.reader = _create_reader(handler)
self.lock = threading.RLock()
self.in_use = 0
def doc(self):
ret = self.reader.doc()
if ret:
return libxml2.xmlDoc(ret)
else:
return None
def feed(self, s):
self.lock.acquire()
if self.in_use:
self.lock.release()
raise StreamParseError, 'StreamReader.feed() is not reentrant!'
self.in_use = 1
try:
return self.reader.feed(s)
finally:
self.in_use = 0
self.lock.release()